﻿#include "lua_helper.hh"

#include "argument/export_argument.hh"
#include "box/service_box.hh"
#include "call_tracer/call_tracer_util.hh"
#include "config/box_config.hh"
#include "config/export_config.hh"
#include "console/export_console.hh"
#include "detail/box_alloc.hh"
#include "detail/debug_server_impl.hh"
#include "detail/lang_impl.hh"
#include "hotfix.lua.inl"
#include "http/export_http.hh"
#include "include/json/json.h"
#include "log_history.hh"
#include "lua_debug.hh"
#include "lua_module.lua.inl"
#include "redis/export_redis.hh"
#include "root/rpc_probe.h"
#include "root/rpc_proxy.h"
#include "root/rpc_proxy_creator_interface.h"
#include "root/rpc_stub.h"
#include "root/rpc_transport.h"
#include "time/export_time.hh"
#include "util/os_util.hh"
#include "util/singleton.hh"
#include "util/string_util.hh"
#include "util/time_util.hh"
#include <google/protobuf/compiler/importer.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/message.h>
#include <google/protobuf/reflection.h>

#include <filesystem>
#include <fstream>
#include <list>
#include <sstream>
#include <string>
#include <vector>

#if defined(_WIN32) || defined(_WIN64)
// 禁用WINDOWS API宏
#ifdef GetMessage
#undef GetMessage
#endif // GetMessage
#endif // defined(_WIN32) || defined(_WIN64)

namespace kratos {
namespace lua {

/**
 * @brief lua内存分配器，接入容器的内存分配器
 * @param ud 未使用
 * @param ptr 内存地址
 * @param osize 原来的长度，字节
 * @param nsize 需要的长度，字节
 * @return 新分配的地址
 */
static void *lua_mem_alloc(void * /*ud*/, void *ptr, size_t osize,
                           size_t nsize) {
  if (!nsize) {
    kratos::service::box_free(ptr);
    return nullptr;
  } else {
    size_t size = 0;
    if (osize == 0) {
      size = nsize;
    } else if (osize > nsize) {
      size = nsize;
    } else if (osize <= nsize) {
      size = nsize;
    }
    auto *new_mem = (void *)kratos::service::box_malloc(size);
    if (!ptr) {
      return new_mem;
    } else {
      auto copy_size = osize > size ? size : osize;
      memcpy(new_mem, ptr, copy_size);
      kratos::service::box_free(ptr);
    }
    return new_mem;
  }
}

/**
 * 记录调用信息-发送服务调用返回
 */
static auto on_lua_stub_call_return(service::ServiceBox *box,
                                    rpc::StubCallPtr stub_call,
                                    const MethodType *method_type_info,
                                    rpc::TransportPtr transport,
                                    ProtobufMessage *msg) -> void {
  if (!box || !stub_call || !method_type_info || !transport || !msg) {
    return;
  }
  if (!set_trace_info(*msg, stub_call->getTraceID(), stub_call->getSpanID(),
                      stub_call->getParentSpanID())) {
    return;
  }
  if (stub_call->getRpc()->getRpcProbe()) {
    stub_call->getRpc()->getRpcProbe()->on_stub_call_return_sent(
        stub_call->getTraceID(), stub_call->getSpanID(),
        stub_call->getParentSpanID(), transport, method_type_info->service_name,
        method_type_info->method_name, msg->DebugString());
  }
}
/**
 * 记录调用信息-调用请求到达
 */
static auto on_lua_stub_call_arrived(service::ServiceBox *box,
                                     rpc::StubCallPtr stub_call,
                                     const MethodType *method_type_info,
                                     rpc::TransportPtr transport,
                                     ProtobufMessage *msg) -> void {
  if (!box || !stub_call || !method_type_info || !transport || !msg) {
    return;
  }
  std::string trace_id;
  std::uint64_t span_id{0};
  std::uint64_t parent_span_id{0};
  // 设置rpc的trace_id, span_id, parent_span_id
  if (get_trace_info(*msg, trace_id, span_id, parent_span_id)) {
    stub_call->getRpc()->setCurTraceID(trace_id);
    stub_call->getRpc()->setCurSpanID(span_id);
    stub_call->getRpc()->setCurParentSpanID(parent_span_id);
    stub_call->setTraceID(trace_id);
    stub_call->setSpanID(span_id);
    stub_call->setParentSpanID(parent_span_id);
    box->get_rpc_probe()->on_stub_call_arrived(
        trace_id, span_id, parent_span_id, transport,
        method_type_info->service_name, method_type_info->method_name,
        msg->DebugString());
  }
}
/**
 * 记录调用信息-发送远程调用请求
 */
static auto on_lua_proxy_sent(service::ServiceBox *box, ProxyCallLua *proxy_ptr,
                              const MethodType *method_type_info,
                              rpc::TransportPtr transport, ProtobufMessage *msg)
    -> void;
/**
 * 记录调用信息-接收到服务调用返回
 */
static auto on_lua_proxy_return(service::ServiceBox *box,
                                rpc::ProxyCall *proxy_ptr,
                                const MethodType *method_type_info,
                                rpc::TransportPtr transport,
                                ProtobufMessage *msg) -> void;

using namespace klogger;

auto MsgFactory::load(const std::string &idl_json_file,
                      const std::string &idl_proto_root_dir,
                      const std::string &file_name) -> bool {
  Json::Value root;
  std::string error;
  //
  // 获取JSON对象
  //
  auto ret = util::get_json_root_from_file(idl_json_file, root, error);
  if (!ret) {
    if (box_) {
      std::string json_error;
      json_error = "Parse JSON file failed: " + idl_json_file;
      box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                      json_error.c_str());
    }
    return false;
  }
  ProtobufDiskSourceTree source_tree;
  //
  // 设置source tree的路径
  //
  source_tree.MapPath("proto", idl_proto_root_dir);
  if (!importer_) {
    importer_ = make_unique_pool_ptr<ProtobufImporter>(&source_tree, nullptr);
  }
  //
  // 导入文件描述
  //
  const auto *file_descriptor = importer_->Import("proto/" + file_name);
  if (!file_descriptor) {
    if (box_) {
      std::string error_str;
      error_str = "Cannot import PROTO file[" + file_name + "]";
      box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                      error_str.c_str());
    }
    return false;
  }
  return load(root, file_descriptor);
}

auto MsgFactory::load(const std::string &idl_json_root_dir,
                      const std::string &idl_proto_root_dir) -> bool {
  //
  // 获取所有的JSON文件
  //
  std::vector<std::string> proto_file_names;
  if (!util::get_file_in_directory(idl_proto_root_dir, ".proto",
                                   proto_file_names)) {
    if (box_) {
      std::string error;
      error = "Retrive .proto file failed in[" + idl_proto_root_dir + "]";
      box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                      error.c_str());
    }
    return false;
  }
  ProtobufDiskSourceTree source_tree;
  //
  // 设置source tree的路径
  //
  source_tree.MapPath("proto", idl_proto_root_dir);
  if (!importer_) {
    importer_ = make_unique_pool_ptr<ProtobufImporter>(&source_tree, nullptr);
  }
  for (const auto &file_name : proto_file_names) {
    //
    // 没有扩展名的文件名
    //
    auto file_name_no_ext =
        std::filesystem::path(file_name).stem().stem().string();
    //
    // source tree虚拟文件路径
    //
    auto virtual_proto_file_path =
        "proto/" + std::filesystem::path(file_name).filename().string();
    const auto *descriptor = importer_->Import(virtual_proto_file_path);
    if (!descriptor) {
      if (box_) {
        std::string error_str;
        error_str = "Cannot import PROTO file[" + file_name + "]";
        box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                        error_str.c_str());
      }
      return false;
    }
    if (box_) {
      box_->write_log_line(klogger::Logger::INFORMATION,
                           "[lua]Import PROTO file[" + file_name + "]");
    }
    std::string json_file_path = util::complete_path(
        idl_json_root_dir, file_name_no_ext + ".idl.cpp.json");
    Json::Value root;
    std::string error;
    auto ret = util::get_json_root_from_file(json_file_path, root, error);
    if (!ret) {
      if (box_) {
        std::string json_error;
        json_error = "Parse JSON file failed[" + json_file_path + "]";
        box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                        json_error.c_str());
      }
      return false;
    } else {
      if (box_) {
        box_->write_log_line(klogger::Logger::INFORMATION,
                             "[lua]Import JSON file[" + json_file_path + "]");
      }
    }
    if (!load(root, descriptor)) {
      if (box_) {
        std::string json_error;
        json_error = "Parse protobuf file failed[" + file_name + "]";
        box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                        json_error.c_str());
      }
      return false;
    }
  }
  return true;
}

auto MsgFactory::load(const Json::Value &idl_json_root,
                      const ProtobufFileDescriptor *file_descriptor) -> bool {
  if (!file_descriptor) {
    return false;
  }
  for (const auto &service : idl_json_root["services"]) {
    //
    // 校验
    //
    if (!service.isMember("name") || !service.isMember("methods")) {
      if (box_) {
        std::string json_error;
        json_error =
            "JSON format invalid, service need 'name' and 'methods' attribute";
        box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                        json_error.c_str());
      }
    }
    auto service_name = service["name"].asString();
    for (const auto &method : service["methods"]) {
      //
      // PB内message的名字, 对应方法的参数, {service name}_{method name}_args
      //
      auto arg_name = file_descriptor->package() + "." +
                      service["name"].asString() + "_" +
                      method["name"].asString() + "_args";
      //
      // PB内message的名字, 对应方法的返回值, {service name}_{method name}_ret
      //
      auto ret_name = file_descriptor->package() + "." +
                      service["name"].asString() + "_" +
                      method["name"].asString() + "_ret";
      const auto *req_desc = importer_->pool()->FindMessageTypeByName(arg_name);
      const auto *ack_desc = importer_->pool()->FindMessageTypeByName(ret_name);
      auto oneway = method.isMember("oneway");
      //
      // 如果返回值不是void类型, 则认为是有返回值的
      //
      auto has_ret_value = (method["retType"]["IdlType"].asString() != "void");
      ProtobufMessage *arg_msg = nullptr;
      ProtobufMessage *ret_msg = nullptr;
      //
      // 方法的参数和返回值对象只建立一个
      //
      if (req_desc) {
        const auto *arg_type = factory_->GetPrototype(req_desc);
        if (arg_type) {
          arg_msg = arg_type->New();
        }
      }
      if (ack_desc) {
        const auto *ret_type = factory_->GetPrototype(ack_desc);
        if (ret_type) {
          ret_msg = ret_type->New();
        }
      }
      auto method_name = method["name"].asString();
      auto timeout = method["timeout"].asInt();
      auto service_uuid_str = service["uuid"].asString();
      auto service_uuid = std::stoull(service_uuid_str);
      MethodType method_call{
          req_desc, ack_desc, oneway,  service_name,     method_name,
          timeout,  arg_msg,  ret_msg, service_uuid_str, has_ret_value};
      //
      // 放入消息表, 后续用于快速获取
      //
      msg_factory_map_[service_uuid].emplace_back(std::move(method_call));
    }
  }
  return true;
}

auto MsgFactory::new_call_param(rpc::ServiceUUID service_uuid,
                                rpc::MethodID method_id) noexcept
    -> ProtobufMessage * {
  auto it = msg_factory_map_.find(service_uuid);
  if (it == msg_factory_map_.end()) {
    return nullptr;
  }
  const auto &methods = it->second;
  if ((method_id > methods.size()) || (method_id == 0)) {
    return nullptr;
  }
  auto &req_msg_ptr = methods[method_id - 1].request_message;
  if (!req_msg_ptr) {
    return nullptr;
  }
  //
  // 清理后返回, 复用
  //
  req_msg_ptr->Clear();
  return req_msg_ptr.get();
}

auto MsgFactory::new_call_return(rpc::ServiceUUID service_uuid,
                                 rpc::MethodID method_id) noexcept
    -> ProtobufMessage * {
  auto it = msg_factory_map_.find(service_uuid);
  if (it == msg_factory_map_.end()) {
    return nullptr;
  }
  const auto &methods = it->second;
  if ((method_id > methods.size()) || (method_id == 0)) {
    return nullptr;
  }
  auto &rep_msg_ptr = methods[method_id - 1].response_message;
  if (!rep_msg_ptr) {
    return nullptr;
  }
  //
  // 清理后返回, 复用
  //
  rep_msg_ptr->Clear();
  return rep_msg_ptr.get();
}

auto MsgFactory::get_type(rpc::ServiceUUID service_uuid,
                          rpc::MethodID method_id) const noexcept
    -> const MethodType * {
  auto it = msg_factory_map_.find(service_uuid);
  if (it == msg_factory_map_.end()) {
    return nullptr;
  }
  const auto &methods = it->second;
  if ((method_id > methods.size()) || (method_id == 0)) {
    return nullptr;
  }
  return &methods[method_id - 1];
}

LuaThread::LuaThread(lua_State *L, rpc::MethodID method_id,
                     int *global_ref_key) {
  LuaUtil::StackGuard guard(L);
  L_ = L;                 // LUA虚拟机
  method_id_ = method_id; // 方法ID
  global_ref_key_ = global_ref_key;
  LuaUtil::get_registry_table(L_, global_ref_key);
  lua_thread_ = LuaUtil::new_thread(L_);
  register_key_ = LuaUtil::new_registry_ref(L_);
  if (register_key_ < 0) {
    throw std::runtime_error("luaL_ref failed");
  }
  thread_id_ = register_key_;
}

LuaThread::~LuaThread() {}

void LuaThread::cleanup() {
  if (!global_ref_key_) {
    return;
  }
  LuaUtil::StackGuard guard(L_);
  LuaUtil::get_registry_table(L_, global_ref_key_);
  LuaUtil::remove_registry_ref(L_, register_key_);
  global_ref_key_ = nullptr;
  register_key_ = 0;
}

bool LuaThread::call(const std::string &method_name,
                     ProtobufMessage &call_return,
                     const ProtobufMessage &call_param) {
  if (!LuaUtil::get_global_table(lua_thread_, method_name)) {
    write_fail_log("Global lua method not found[" + method_name + "]");
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  LuaUtil::lua_push(lua_thread_, call_param);
  if (!LuaUtil::resume_nargs(lua_thread_, last_error_, 1)) {
    state_ = ThreadState::DEAD;
    write_fail_log(last_error_);
    return false;
  }
  state_ = LuaUtil::get_status(lua_thread_);
  if (is_yield()) {
    return true;
  } else {
    if (!LuaUtil::lua_pop_result(lua_thread_, call_return)) {
      return false;
    }
  }
  return true;
}

bool LuaThread::call_no_ret(const std::string &method_name,
                            const ProtobufMessage &call_param) {
  if (!LuaUtil::get_global_table(lua_thread_, method_name)) {
    write_fail_log("Global lua method not found[" + method_name + "]");
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  LuaUtil::lua_push(lua_thread_, call_param);
  if (!LuaUtil::resume_nargs(lua_thread_, last_error_, 1)) {
    state_ = ThreadState::DEAD;
    write_fail_log(last_error_);
    return false;
  }
  state_ = LuaUtil::get_status(lua_thread_);
  return true;
}

bool LuaThread::call_no_ret(const std::string &method_name) {
  if (!LuaUtil::get_global_table(lua_thread_, method_name)) {
    write_fail_log("Global lua method not found[" + method_name + "]");
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  if (!LuaUtil::resume_nargs(lua_thread_, last_error_, 0)) {
    state_ = ThreadState::DEAD;
    write_fail_log(last_error_);
    return false;
  }
  state_ = LuaUtil::get_status(lua_thread_);
  return true;
}

bool LuaThread::call_no_ret(const std::string &method_name, std::time_t tick) {
  if (!LuaUtil::get_global_table(lua_thread_, method_name)) {
    write_fail_log("Global lua method not found[" + method_name + "]");
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  LuaUtil::lua_push(lua_thread_, tick);
  if (!LuaUtil::resume_nargs(lua_thread_, last_error_, 1)) {
    state_ = ThreadState::DEAD;
    write_fail_log(last_error_);
    return false;
  }
  state_ = LuaUtil::get_status(lua_thread_);
  return true;
}

ThreadState LuaThread::get_state(int error) {
  switch (error) {
  case LUA_OK:
    return ThreadState::READY;
  case LUA_YIELD:
    return ThreadState::YIELD;
  default:
    return ThreadState::DEAD;
  }
}

bool LuaThread::resume(int nargs) {
  if (is_dead()) {
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  if (!LuaUtil::resume_nargs(lua_thread_, last_error_, nargs)) {
    state_ = ThreadState::DEAD;
    call_exclude_event_handler(ThreadEvent::EXIT);
    write_fail_log(get_traceback());
    return false;
  }
  state_ = LuaUtil::get_status(lua_thread_);
  call_exclude_event_handler(ThreadEvent::EXIT);
  return true;
}

bool LuaThread::resume(const ProtobufMessage &data, int nargs) {
  if (is_dead()) {
    return false;
  }
  if (manager_) {
    manager_->set_current_thread_id(thread_id_);
  }
  LuaUtil::pbmsg_to_lua_table(lua_thread_, data);
  return resume(nargs);
}

void LuaThread::reset() {
  method_id_ = rpc::INVALID_METHOD_ID;
  service_uuid_ = rpc::INVALID_SERVICE_UUID;
  call_id_ = rpc::INVALID_CALL_ID;
  stub_call_.reset();
  transport_.reset();
  proxy_call_service_uuid_ = rpc::INVALID_SERVICE_UUID;
  proxy_call_method_id_ = rpc::INVALID_METHOD_ID;
  last_error_.clear();
  state_ = ThreadState::READY;
  service_id_ = 0;
  //
  // 清空Lua栈
  //
  lua_settop(lua_thread_, 0);
}

auto LuaThread::write_fail_log(const std::string &error_string) -> void {
  if (manager_ && manager_->get_box()) {
    manager_->get_box()->write_log(lang::LangID::LANG_LUA_ERROR,
                                   Logger::FAILURE, error_string.c_str());
  }
}

auto LuaThread::write_fatal_log(const std::string &error_string) -> void {
  if (manager_ && manager_->get_box()) {
    manager_->get_box()->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                                   error_string.c_str());
  }
}

const std::string &LuaThread::get_traceback() {
  LuaUtil::catch_lua_error(this, lua_status(lua_thread_), last_error_);
  return last_error_;
}

bool LuaThread::is_dead() {
  state_ = get_state(lua_status(lua_thread_));
  return (state_ == ThreadState::DEAD);
}

bool LuaThread::is_ready() {
  state_ = get_state(lua_status(lua_thread_));
  return (state_ == ThreadState::READY);
}

bool LuaThread::is_yield() {
  state_ = get_state(lua_status(lua_thread_));
  return (state_ == ThreadState::YIELD);
}

rpc::MethodID LuaThread::get_method_id() { return method_id_; }

void LuaThread::set_method_id(rpc::MethodID method_id) {
  method_id_ = method_id;
}

lua_State *LuaThread::get_lua_state() { return lua_thread_; }

lua_State *LuaThread::get_lua_main() { return L_; }

ThreadID LuaThread::get_id() { return (ThreadID)register_key_; }

void LuaThread::set_service_uuid(rpc::ServiceUUID service_uuid) {
  service_uuid_ = service_uuid;
}

rpc::ServiceUUID LuaThread::get_service_uuid() { return service_uuid_; }

void ThreadManager::set_current_thread_id(ThreadID thread_id) {
  current_thread_id_ = thread_id;
}

void LuaThread::set_manager(ThreadManager *manager) { manager_ = manager; }

ThreadManager *LuaThread::get_manager() { return manager_; }

void LuaThread::set_proxy_call_service_uuid(rpc::ServiceUUID uuid) {
  proxy_call_service_uuid_ = uuid;
}

rpc::ServiceUUID LuaThread::get_proxy_call_service_uuid() {
  return proxy_call_service_uuid_;
}

void LuaThread::set_proxy_call_method_id(rpc::MethodID method_id) {
  proxy_call_method_id_ = method_id;
}

rpc::MethodID LuaThread::get_proxy_call_method_id() {
  return proxy_call_method_id_;
}

void LuaThread::set_call_id(rpc::CallID call_id) { call_id_ = call_id; }

rpc::CallID LuaThread::get_call_id() { return call_id_; }

void LuaThread::set_transport(rpc::TransportPtr &transport) {
  transport_ = transport;
}

rpc::TransportPtr &LuaThread::get_transport() { return transport_; }

const std::string &LuaThread::get_last_error() { return last_error_; }

void LuaThread::set_stub_call(rpc::StubCallPtr &stub_call) {
  stub_call_ = stub_call;
}

rpc::StubCallPtr &LuaThread::get_stub_call() { return stub_call_; }

void LuaThread::add_exclude_event_handler(ThreadEvent event,
                                          ThreadEventFunc func,
                                          std::uint64_t user_data) {
  auto it = exclude_event_map_.find(event);
  if (it != exclude_event_map_.end()) {
    throw std::runtime_error("add_exclude_event_handler error");
  }
  exclude_event_map_[event] = {func, user_data};
}

void LuaThread::remove_exclude_event_handler(ThreadEvent event) {
  exclude_event_map_.erase(event);
}

void LuaThread::call_exclude_event_handler(ThreadEvent event) {
  if (state_ == ThreadState::NONE || state_ == ThreadState::YIELD) {
    return;
  }
  auto it = exclude_event_map_.find(event);
  if (it == exclude_event_map_.end()) {
    return;
  }
  if (state_ == ThreadState::READY) {
    it->second.callback(it->second.user_data, this, true);
  } else {
    it->second.callback(it->second.user_data, this, false);
  }
  // 执行过后立刻删除
  exclude_event_map_.erase(it);
}

void LuaThread::set_service_id(std::uint64_t service_id) {
  service_id_ = service_id;
}

std::uint64_t LuaThread::get_service_id() { return service_id_; }

ThreadManager::ThreadManager(service::ServiceBox *box, lua_State *L,
                             MsgFactory *msg_factory,
                             LuaServiceImpl *lua_service_ptr) {
  L_ = L;
  msg_factory_ = msg_factory;
  buffer_ = make_unique_pool_ptr<char>(BUFFER_SIZE);
  box_ = box;
  lua_service_ptr_ = lua_service_ptr;
  // 安装协程全局表
  install_thread_table();
}

ThreadManager::~ThreadManager() { thread_map_.clear(); }

void ThreadManager::install_thread_table() {
  LuaUtil::lua_push(L_, &lua_ref_key_);
  LuaUtil::create_table(L_);
  LuaUtil::set_registry_table(L_);
}

ThreadPtr ThreadManager::new_lua_thread(rpc::MethodID method_id) {
  //
  // 建立一个新的协程
  //
  ThreadPtr thread_ptr =
      make_shared_pool_ptr<LuaThread>(L_, method_id, &lua_ref_key_);
  if (!thread_ptr) {
    write_fail_log("Create lua thread failed");
    return nullptr;
  }
  //
  // 添加到管理器
  //
  thread_map_[thread_ptr->get_id()] = thread_ptr;
  //
  // 设置管理器指针
  //
  thread_ptr->set_manager(this);
  if (debugger_) {
    //
    // 如果正在运行调试器则hook当前协程
    //
    debugger_->hook(thread_ptr->get_lua_state());
  }
  return thread_ptr;
}

bool ThreadManager::remove(ThreadID thread_id) {
  //
  // 如果要销毁的协程是当前协程, 重置当前协程ID
  //
  if (current_thread_id_ == thread_id) {
    current_thread_id_ = 0;
  }
  auto it = thread_map_.find(thread_id);
  if (it == thread_map_.end()) {
    //
    // 协程未找到, 仍然返回true, 对实际流程没有作用
    //
    write_fail_log("Create lua thread failed");
    return true;
  }
  if (debugger_) {
    //
    // 如果正在运行调试器则unhook协程
    //
    debugger_->unhook(it->second->get_lua_state());
  }
  //
  // 清理协程
  //
  it->second->cleanup();
  //
  // 从正在运行的协程表内删除
  //
  thread_map_.erase(it);
  return true;
}

bool ThreadManager::remove(const ThreadPtr &coroutine_ptr) {
  return remove(coroutine_ptr->get_id());
}

ThreadPtr ThreadManager::get(ThreadID thread_id) {
  auto it = thread_map_.find(thread_id);
  if (it == thread_map_.end()) {
    return nullptr;
  }
  return it->second;
}

ThreadPtr ThreadManager::get_current_thread() {
  return get(current_thread_id_);
}

ThreadID ThreadManager::get_current_thread_id() { return current_thread_id_; }

bool ThreadManager::call_lua_service_method(
    rpc::StubCallPtr stub_call, rpc::ServiceUUID uuid, rpc::MethodID method_id,
    rpc::CallID call_id, rpc::ServiceID service_id, rpc::TransportPtr transport,
    const char *data, int size) {
  //
  // 获取调用方法原型
  //
  const auto *method_type_info = msg_factory_->get_type(uuid, method_id);
  if (!method_type_info) {
    write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                   std::to_string(method_id) + "] not found");
    //
    // 通知RPC框架调用完成, 方法未找到
    //
    rpc_finish(stub_call, rpc::RpcError::NOT_FOUND);
    return false;
  }
  //
  // 获取调用参数PB类
  //
  auto *argument = msg_factory_->new_call_param(uuid, method_id);
  if (!argument) {
    write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                   method_type_info->lua_real_method_name + "] not found");
    //
    // 通知RPC框架调用完成, 方法未找到
    //
    rpc_finish(stub_call, rpc::RpcError::NOT_FOUND);
    return false;
  }
  //
  // 反序列化参数PB
  //
  if (!argument->ParseFromArray(data, size)) {
    write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                   method_type_info->lua_real_method_name +
                   "] ParseFromArray() failed");
    //
    // 通知RPC框架调用完成, 发生异常
    //
    rpc_finish(stub_call, rpc::RpcError::EXCEPTION);
    return false;
  }
  //
  // 建立一个协程, 在这个协程内执行Lua方法
  //
  auto thread_ptr = new_lua_thread(method_id);
  if (!thread_ptr) {
    write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                   method_type_info->lua_real_method_name +
                   "] create coroutine failed");
    //
    // 通知RPC框架调用完成, 发生异常
    //
    rpc_finish(stub_call, rpc::RpcError::EXCEPTION);
    return false;
  }
  thread_ptr->set_stub_call(stub_call);      // 当前StubCall
  current_thread_id_ = thread_ptr->get_id(); // 设置当前协程ID
  thread_ptr->set_service_uuid(uuid);        // 当前service UUID
  thread_ptr->set_call_id(call_id);          // 当前调用ID
  thread_ptr->set_transport(transport);   // 当前调用数据的来源管道
  thread_ptr->set_service_id(service_id); // 当前服务实例ID
  //
  // 调用链记录打点, 服务接收到调用请求
  //
  if (box_ && box_->get_config().is_open_trace() && box_->get_rpc_probe()) {
    on_lua_stub_call_arrived(box_, stub_call, method_type_info, transport,
                             argument);
  }
  // 返回值
  ProtobufMessage *return_value = nullptr;
  if (!method_type_info->oneway) {
    //
    // 非oneway类型, 建立返回值对象
    //
    return_value = msg_factory_->new_call_return(uuid, method_id);
  }
  auto lua_ret = false; // 调用Lua函数是否成功
  //
  // 调用Lua服务方法
  //
  if (method_type_info->has_ret_value && return_value) {
    //
    // 有返回值
    //
    lua_ret = thread_ptr->call(method_type_info->lua_real_method_name,
                               *return_value, *argument);
  } else {
    //
    // oneway, 没有返回值
    //
    lua_ret = thread_ptr->call_no_ret(method_type_info->lua_real_method_name,
                                      *argument);
  }
  if (!lua_ret) {
    //
    // 调用Lua方法发生错误
    //
    write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                   method_type_info->method_name + "] invoke failed\n" +
                   thread_ptr->get_last_error());
    //
    // 重置当前协程ID
    //
    current_thread_id_ = 0;
    //
    // 通知RPC框架调用完成, 发生异常
    //
    rpc_finish(stub_call, rpc::RpcError::EXCEPTION);
    //
    // 销毁协程
    //
    remove(thread_ptr);
    return false;
  }
  if (thread_ptr->is_yield()) {
    //
    // Lua协程在执行的过程中YIELD, 返回, 协程会根据YIELD条件被框架唤醒
    //
    return true;
  }
  //
  // 执行完毕，销毁协程
  //
  remove(thread_ptr);
  //
  // 调用链记录打点, 服务返回返回值
  //
  if (box_ && box_->get_config().is_open_trace() && box_->get_rpc_probe() &&
      (!method_type_info->oneway) && return_value) {
    on_lua_stub_call_return(box_, stub_call, method_type_info, transport,
                            return_value);
  }
  //
  // 检查是否有返回值
  //
  if (method_type_info->has_ret_value && return_value) {
    //
    // 有返回值则发送返回包
    //
    rpc::RpcRetHeader ret_header;
    auto body_size = return_value->ByteSizeLong();
    // 建立包头
    rpc::buildHeader(ret_header, call_id, service_id,
                     (rpc::ErrorID)rpc::RpcError::OK, (std::uint32_t)body_size);
    // 序列化
    if (!return_value->SerializeToArray(buffer_.get(), BUFFER_SIZE)) {
      write_fail_log("Service[" + std::to_string(uuid) + "], Method[" +
                     method_type_info->method_name +
                     "] SerializeToArray() failed");
      //
      // 通知RPC框架调用完成, 发生异常
      //
      rpc_finish(stub_call, rpc::RpcError::EXCEPTION);
      return false;
    }
    // 发送
    transport->send(reinterpret_cast<const char *>(&ret_header),
                    sizeof(ret_header));
    transport->send(buffer_.get(), (int)body_size);
  } else {
    //
    // 非oneway方法，没有返回值也需要发送返回包
    //
    if (!method_type_info->oneway) {
      // 有返回值则发送返回包
      rpc::RpcRetHeader ret_header;
      // 建立包头
      rpc::buildHeader(ret_header, call_id, service_id,
                       (rpc::ErrorID)rpc::RpcError::OK, 0);
      // 发送
      transport->send(reinterpret_cast<const char *>(&ret_header),
                      sizeof(ret_header));
    }
  }
  //
  // 通知RPC框架调用完成
  //
  rpc_finish(stub_call, rpc::RpcError::OK);
  return true;
}

auto on_lua_proxy_sent(service::ServiceBox *box, ProxyCallLua *proxy_ptr,
                       const MethodType *method_type_info,
                       rpc::TransportPtr transport, ProtobufMessage *msg)
    -> void {
  if (!box || !proxy_ptr || !method_type_info || !transport || !msg) {
    return;
  }
  std::time_t fork_ts{0};
  proxy_ptr->getRpc()->getRpcProbe()->on_get_micro_now(fork_ts);
  proxy_ptr->setForkTimestamp(fork_ts);
  std::string trace_id = proxy_ptr->getRpc()->getCurTraceID();
  std::uint64_t span_id{0};
  proxy_ptr->getRpc()->getRpcProbe()->on_gen_uuid(span_id);
  std::uint64_t parent_span_id{0};
  if (trace_id.empty()) {
    proxy_ptr->getRpc()->getRpcProbe()->on_gen_uuid(trace_id);
  }
  parent_span_id = proxy_ptr->getSpanID();
  proxy_ptr->setTraceID(trace_id);
  proxy_ptr->setSpanID(span_id);
  proxy_ptr->setParentSpanID(parent_span_id);
  set_trace_info(*msg, trace_id, span_id, parent_span_id);
  if (box->get_rpc_probe()) {
    box->get_rpc_probe()->on_proxy_call_sent(
        trace_id, span_id, parent_span_id, transport,
        method_type_info->service_name, method_type_info->method_name,
        msg->DebugString(), method_type_info->oneway);
  }
}

int ThreadManager::lua_call_proxy_method(rpc::ServiceUUID service_uuid,
                                         rpc::ServiceID service_id,
                                         rpc::ProxyID proxy_id,
                                         rpc::MethodID method_id) {
  //
  // 获取当前协程
  //
  auto thread_ptr = get_current_thread();
  if (!thread_ptr) {
    write_fatal_log("Current lua thread not found, service UUID[" +
                    std::to_string(service_uuid) + "], method ID[" +
                    std::to_string(method_id) + "]");
    return 0;
  }
  LuaUtil::BoolPusher pusher(thread_ptr->get_lua_state());
  //
  // 获取方法原型
  //
  const auto *method_type = msg_factory_->get_type(service_uuid, method_id);
  if (!method_type) {
    //
    // 调用失败
    //
    write_fatal_log("Current proxy method not found, service UUID[" +
                    std::to_string(service_uuid) + "], method ID[" +
                    std::to_string(method_id) + "]");
    return pusher.return_value();
  }
  //
  // 查找代理
  //
  auto proxy_ptr = get_proxy(proxy_id);
  if (!proxy_ptr) {
    //
    // 调用失败
    //
    write_fail_log("Service[" + method_type->service_name + "], Method[" +
                   method_type->method_name + "] proxy not found");
    return pusher.return_value();
  }
  //
  // 建立参数类
  //
  auto *argument = msg_factory_->new_call_param(service_uuid, method_id);
  if (!argument) {
    //
    // 调用失败
    //
    write_fail_log("Service[" + method_type->service_name + "], Method[" +
                   method_type->method_name + "] not found");
    return pusher.return_value();
  }
  //
  // 将参数从栈取出
  //
  LuaUtil::lua_table_to_pbmsg(thread_ptr->get_lua_state(), *argument);
  ProxyCallLuaPtr lua_proxy_call; // ProxyCall
  auto transport_ptr = proxy_ptr->getTransport();
  if (box_ && box_->get_rpc()) {
    //
    // 建立rpc::ProxyCall
    //
    lua_proxy_call = box_->get_rpc()->make_shared_ptr<ProxyCallLua>(
        box_->get_rpc(), transport_ptr, proxy_id, this, method_type->timeout);
  }
  //
  // 调用链记录打点, 调用远程服务
  //
  if (box_ && box_->get_config().is_open_trace() && box_->get_rpc_probe()) {
    on_lua_proxy_sent(box_, lua_proxy_call.get(), method_type, transport_ptr,
                      argument);
  }
  auto body_size = argument->ByteSizeLong(); // 参数字节流长度
  //
  // 序列化
  //
  if (!argument->SerializeToArray(buffer_.get(), (int)body_size)) {
    //
    // 调用失败
    //
    write_fail_log("Service[" + std::to_string(service_uuid) + "], Method[" +
                   method_type->method_name + "] SerializeToArray() failed");
    return pusher.return_value();
  }
  rpc::ProxyCallManager *proxy_call_manager{nullptr}; // ProxyCallManager
  rpc::CallID proxy_call_id{rpc::INVALID_CALL_ID};    // CallID
  if (box_ && box_->get_rpc()) {
    proxy_call_manager = box_->get_rpc()->getProxyCallManager();
    //
    // 加入到调用管理器
    //
    proxy_call_manager->add(lua_proxy_call);
    //
    // 获取到CallID
    //
    proxy_call_id = lua_proxy_call->getCallID();
    //
    // 调用代理
    //
    if (!send_proxy_call(service_uuid, service_id, method_id, proxy_call_id,
                         transport_ptr, buffer_.get(), body_size)) {
      //
      // 调用失败
      //
      proxy_call_manager->destroy(proxy_call_id);
      return pusher.return_value();
    }
  }
  //
  // 设置当前正在发起调用的proxy对应的service UUID
  //
  thread_ptr->set_proxy_call_service_uuid(service_uuid);
  //
  // 设置当前正在发起调用的proxy对应的方法ID
  //
  thread_ptr->set_proxy_call_method_id(method_id);
  if (!method_type->oneway) {
    //
    // 非oneway方法则调用完成后lua代码内YIELD
    //
    return pusher.return_value(true);
  } else {
    //
    // oneway方法不需要YIELD
    //
    if (proxy_call_manager) {
      proxy_call_manager->destroy(proxy_call_id);
    }
    return pusher.return_value(false);
  }
}

auto on_lua_proxy_return(service::ServiceBox *box, rpc::ProxyCall *proxy_ptr,
                         const MethodType *method_type_info,
                         rpc::TransportPtr transport, ProtobufMessage *msg)
    -> void {
  if (!box || !proxy_ptr || !method_type_info || !transport) {
    return;
  }
  std::string trace_id = proxy_ptr->getTraceID();
  std::uint64_t span_id = proxy_ptr->getSpanID();
  std::uint64_t parent_span_id = proxy_ptr->getParentSpanID();
  box->get_rpc_probe()->on_proxy_call_return_arrived(
      trace_id, span_id, parent_span_id, transport,
      method_type_info->service_name, method_type_info->method_name,
      msg ? msg->DebugString() : "", proxy_ptr->getForkTimestamp());
}

bool ThreadManager::proxy_call_return_to_lua(rpc::ProxyCall *proxy_ptr,
                                             rpc::ServiceID service_id,
                                             rpc::CallID call_id,
                                             const char *data,
                                             std::size_t size) {
  //
  // 获取ProxyCall调用对应的协程
  //
  auto thread_ptr = get_thread_by_call_id(call_id);
  if (!thread_ptr) {
    write_fail_log("Thread not found, call ID[" + std::to_string(call_id) +
                   "]");
    return false;
  }
  //
  // 设置当前协程
  //
  current_thread_id_ = thread_ptr->get_id();
  //
  // 获取proxy call的方法原型
  //
  const auto *method_type_info =
      msg_factory_->get_type(thread_ptr->get_proxy_call_service_uuid(),
                             thread_ptr->get_proxy_call_method_id());
  if (!method_type_info) {
    write_fail_log("Service[" +
                   std::to_string(thread_ptr->get_proxy_call_service_uuid()) +
                   "], Method ID[" +
                   std::to_string(thread_ptr->get_proxy_call_method_id()) +
                   "] SerializeToArray() failed");
    remove(thread_ptr);
    return false;
  }
  //
  // 返回服务实例ID, Lua代码会获取服务实例ID
  //
  LuaUtil::lua_push(thread_ptr->get_lua_state(), service_id);
  //
  // 处理调用返回值
  //
  if (method_type_info->has_ret_value) {
    //
    // 唤醒Lua协程处理返回值
    //
    if (!proxy_call_return_message(thread_ptr, data, size, 2)) {
      write_fail_log("Push lua method proxy call return value failed[" +
                     method_type_info->method_name + "]");
      remove(thread_ptr);
      return false;
    }
  } else {
    //
    // 恢复协程运行, 弹出service ID
    //
    if (!thread_ptr->resume(1)) {
      write_fail_log("Resume lua method failed[" +
                     method_type_info->method_name + "]");
      remove(thread_ptr);
      return false;
    }
  }
  if (thread_ptr->is_yield()) {
    //
    // Lua函数再次yield
    //
    return true;
  } else {
    //
    // 调用链记录打点, 调用远程服务返回
    //
    if (box_ && proxy_ptr && box_->get_config().is_open_trace() &&
        box_->get_rpc_probe()) {
      // 建立参数类
      auto *ret_val = msg_factory_->new_call_return(
          thread_ptr->get_service_uuid(), thread_ptr->get_method_id());
      if (ret_val) {
        // 反序列化参数
        if (!ret_val->ParseFromArray(data, (int)size)) {
          write_fail_log("Service[" +
                         std::to_string(thread_ptr->get_service_uuid()) +
                         "], Method[" + method_type_info->lua_real_method_name +
                         "] ParseFromArray() failed");
          remove(thread_ptr);
          return false;
        }
      }
      on_lua_proxy_return(box_, proxy_ptr, method_type_info,
                          proxy_ptr->getTransport(), ret_val);
    }
    //
    // 如果ProxyCall处于StubCall的执行过程中, 执行完毕后返回调用返回值
    //
    if (thread_ptr->get_stub_call()) {
      check_and_return_stub_call(thread_ptr);
    }
  }
  return true;
}

lua_State *ThreadManager::get_lua_state() { return L_; }

void ThreadManager::check_and_return_stub_call(ThreadPtr thread_ptr) {
  //
  // 执行完毕后返回调用返回值
  //
  const auto *real_method_call_type = msg_factory_->get_type(
      thread_ptr->get_service_uuid(), thread_ptr->get_proxy_call_method_id());
  if (!real_method_call_type) {
    write_fail_log("Stub call method return value type not found, service[" +
                   std::to_string(thread_ptr->get_service_uuid()) +
                   ", method[" + std::to_string(thread_ptr->get_method_id()) +
                   "]");
    remove(thread_ptr);
    return;
  }
  if (real_method_call_type->oneway) {
    //
    // 单路, 协程执行完成，销毁
    //
    remove(thread_ptr);
    return;
  }
  //
  // 发送返回值
  //
  send_stub_call_return(thread_ptr);
  //
  // 通知框架调用完成
  //
  rpc_finish(thread_ptr->get_stub_call(), rpc::RpcError::OK);
  //
  // 协程执行完成，销毁
  //
  remove(thread_ptr);
}

void ThreadManager::set_debugger(Debugger *debugger) { debugger_ = debugger; }

auto ThreadManager::get_debugger() -> Debugger * { return debugger_; }

auto ThreadManager::get_thread_num() -> std::size_t {
  return thread_map_.size();
}

auto ThreadManager::get_all_thread() -> const ThreadMap & {
  return thread_map_;
}

auto ThreadManager::get_proxy(rpc::ProxyID proxy_id) -> ProxyLuaPtr {
  auto it = proxy_map_.find(proxy_id);
  if (it == proxy_map_.end()) {
    return nullptr;
  }
  return it->second;
}

auto ThreadManager::add_proxy(ProxyLuaPtr proxy_lua_ptr) -> bool {
  if (proxy_map_.find(proxy_lua_ptr->getID()) != proxy_map_.end()) {
    return false;
  }
  proxy_map_[proxy_lua_ptr->getID()] = proxy_lua_ptr;
  auto proxy_ptr = std::dynamic_pointer_cast<rpc::Proxy>(proxy_lua_ptr);
  box_->get_rpc()->getProxyManager()->add(proxy_ptr);
  return true;
}

bool ThreadManager::send_proxy_call(rpc::ServiceUUID service_uuid,
                                    rpc::ServiceID service_id,
                                    rpc::MethodID method_id,
                                    rpc::CallID call_id,
                                    rpc::TransportPtr transport,
                                    const char *data, std::size_t size) {
  //
  // 记录调用与协程的对应关系
  //
  if (!add_proxy_call(call_id, current_thread_id_)) {
    //
    // 致命错误
    //
    return false;
  }
  // 建立包头
  rpc::RpcCallHeader header;
  rpc::buildHeader(header, service_uuid, service_id, call_id, method_id,
                   (int)size);
  // 发送调用请求
  if (sizeof(header) != transport->send(reinterpret_cast<const char *>(&header),
                                        sizeof(header))) {
    return false;
  }
  if ((int)size != transport->send(data, (int)size)) {
    return false;
  }
  return true;
}

void ThreadManager::send_stub_call_return(ThreadPtr &thread_ptr) {
  rpc::RpcRetHeader header;
  auto service_uuid = thread_ptr->get_service_uuid();
  auto service_id = thread_ptr->get_service_id();
  auto method_id = thread_ptr->get_stub_call()->getMethodID();
  auto *vm = thread_ptr->get_lua_state();
  auto call_id = thread_ptr->get_call_id();
  auto &transport = thread_ptr->get_transport();
  auto &stub_call = thread_ptr->get_stub_call();
  auto *method_type = msg_factory_->get_type(service_uuid, method_id);
  auto *method_ret = msg_factory_->new_call_return(service_uuid, method_id);
  auto error = rpc::RpcError::OK;
  //
  // 返回PB内设置trace信息
  //
  set_trace_info(*method_ret, stub_call->getTraceID(), stub_call->getSpanID(),
                 stub_call->getParentSpanID());
  std::size_t body_size = 0;
  if (method_type->has_ret_value) {
    //
    // 有函数返回值, 返回值必须为table
    //
    if (!lua_istable(vm, -1)) {
      error = rpc::RpcError::EXCEPTION;
    } else {
      LuaUtil::lua_table_to_pbmsg(vm, *method_ret);
      body_size = method_ret->ByteSizeLong();
      if (!method_ret->SerializeToArray(buffer_.get(), (int)body_size)) {
        error = rpc::RpcError::EXCEPTION;
        body_size = 0;
      }
    }
  } else {
    //
    // 没有函数返回值
    //
    body_size = method_ret->ByteSizeLong();
    if (!method_ret->SerializeToArray(buffer_.get(), (int)body_size)) {
      error = rpc::RpcError::EXCEPTION;
      body_size = 0;
    }
  }
  //
  // 记录调用信息
  //
  on_lua_stub_call_return(box_, stub_call, method_type, transport, method_ret);
  //
  // 建立返回包并发送
  //
  rpc::buildHeader(header, call_id, service_id, (rpc::ErrorID)error,
                   (int)body_size);
  transport->send((const char *)&header, sizeof(header));
  if (body_size) {
    transport->send(buffer_.get(), (int)body_size);
  }
}

ThreadPtr ThreadManager::get_thread_by_call_id(rpc::CallID call_id) {
  //
  // 获取调用对应的协程ID
  //
  auto it = proxy_call_thread_map_.find(call_id);
  if (it == proxy_call_thread_map_.end()) {
    return nullptr;
  }
  return get(it->second);
}

bool ThreadManager::proxy_call_return_message(ThreadPtr &thread_ptr,
                                              const char *data,
                                              std::size_t size, int nargs) {
  //
  // 获取ProxyCall返回值PB对象
  //
  auto *ret_value =
      msg_factory_->new_call_return(thread_ptr->get_proxy_call_service_uuid(),
                                    thread_ptr->get_proxy_call_method_id());
  if (!ret_value) {
    write_fail_log(
        "Stub call method return value type not found, service UUID[" +
        std::to_string(thread_ptr->get_service_uuid()) + ", method ID[" +
        std::to_string(thread_ptr->get_method_id()) + "]");
    return false;
  }
  //
  // 反序列化
  //
  if (!ret_value->ParseFromArray(data, (int)size)) {
    write_fail_log("ParseFromArray failed, service UUID[" +
                   std::to_string(thread_ptr->get_service_uuid()) +
                   ", method ID[" +
                   std::to_string(thread_ptr->get_method_id()) + "]");
    return false;
  }
  //
  // 压栈返回值并唤醒协程
  //
  if (!thread_ptr->resume(*ret_value, nargs)) {
    write_fail_log(thread_ptr->get_last_error());
    return false;
  }
  return true;
}

auto ThreadManager::write_fail_log(const std::string &error_string) -> void {
  if (box_) {
    box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FAILURE,
                    error_string.c_str());
  }
}

auto ThreadManager::write_fatal_log(const std::string &error_string) -> void {
  if (box_) {
    box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                    error_string.c_str());
  }
}

void ThreadManager::rpc_finish(rpc::StubCallPtr &stub_call,
                               rpc::RpcError error) {
  if (error != rpc::RpcError::OK) {
    //
    // 当发生错误时需要回复一个错误
    //
    rpc::RpcRetHeader ret_header;
    // 建立包头
    rpc::buildHeader(ret_header, stub_call->getCallID(),
                     stub_call->getServiceID(), (rpc::ErrorID)error, 0);
    // 发送
    stub_call->getTransport()->send(reinterpret_cast<const char *>(&ret_header),
                                    sizeof(ret_header));
  }
  if (stub_call) {
    //
    // 调用完成, 框架清理本次StubCall
    //
    stub_call->finish();
  }
}

void ThreadManager::remove_proxy(rpc::ProxyID proxy_id) {
  box_->get_rpc()->getProxyManager()->destroy(proxy_id);
  proxy_map_.erase(proxy_id);
}

bool ThreadManager::add_proxy_call(rpc::CallID call_id, ThreadID thread_id) {
  if (proxy_call_thread_map_.end() != proxy_call_thread_map_.find(call_id)) {
    write_fatal_log("Duplicated call ID");
    return false;
  }
  proxy_call_thread_map_[call_id] = thread_id;
  return true;
}

void ThreadManager::remove_proxy_call(rpc::CallID call_id) {
  box_->get_rpc()->getProxyCallManager()->destroy(call_id);
  proxy_call_thread_map_.erase(call_id);
}

rpc::ProxyID ThreadManager::try_get_proxy_id(rpc::ServiceUUID service_uuid,
                                             const std::string &service_name) {
  //
  // 通过服务名得到对应的管道
  //
  auto transport = box_->try_get_transport(service_name);
  if (!transport) {
    return rpc::INVALID_PROXY_ID;
  }
  auto proxy_lua = make_shared_pool_ptr<ProxyLua>(
      box_->get_rpc(), rpc::getGlobalProxyCreator().createProxyID(),
      service_uuid, msg_factory_, this);
  if (!proxy_lua) {
    //
    // 内存分配失败
    //
    write_fatal_log("Bad alloc");
    return rpc::INVALID_PROXY_ID;
  }
  proxy_lua->setTransport(transport); // 设置远端管道
  //
  // 设置Proxy绑定到上次调用的服务实例, 不需要每次调用都使用不同的服务实例
  //
  proxy_lua->attach();
  //
  // 将ProxyLua添加到ProxyManager
  //
  add_proxy(proxy_lua);
  return proxy_lua->getID();
}

rpc::ProxyID ThreadManager::try_get_proxy_id(rpc::ServiceUUID service_uuid,
                                             rpc::GlobalIndex global_index,
                                             rpc::TransportPtr transport) {
  auto proxy_lua = make_shared_pool_ptr<ProxyLua>(
      box_->get_rpc(), rpc::getGlobalProxyCreator().createProxyID(),
      service_uuid, msg_factory_, this);
  if (!proxy_lua) {
    //
    // 内存分配失败
    //
    write_fatal_log("Bad alloc");
    return rpc::INVALID_PROXY_ID;
  }
  proxy_lua->setTransport(transport);
  //
  // 设置Proxy绑定到上次调用的服务实例, 不需要每次调用都使用不同的服务实例
  //
  proxy_lua->attach();
  //
  // 设置全局GlobalIndex
  //
  proxy_lua->setGlobalIndex(global_index);
  //
  // 将ProxyLua添加到ProxyManager
  //
  add_proxy(proxy_lua);
  return proxy_lua->getID();
}

rpc::ProxyID ThreadManager::try_get_proxy_id(rpc::ServiceUUID service_uuid,
                                             rpc::TransportPtr transport) {
  auto proxy_lua = make_shared_pool_ptr<ProxyLua>(
      box_->get_rpc(), rpc::getGlobalProxyCreator().createProxyID(),
      service_uuid, msg_factory_, this);
  if (!proxy_lua) {
    //
    // 内存分配失败
    //
    write_fatal_log("Bad alloc");
    return rpc::INVALID_PROXY_ID;
  }
  proxy_lua->setTransport(transport);
  //
  // 设置Proxy绑定到上次调用的服务实例, 不需要每次调用都使用不同的服务实例
  //
  proxy_lua->attach();
  //
  // 将ProxyLua添加到ProxyManager
  //
  add_proxy(proxy_lua);
  return proxy_lua->getID();
}

rpc::TransportPtr ThreadManager::get_proxy_transport(rpc::ProxyID proxy_id) {
  auto it = proxy_map_.find(proxy_id);
  if (it == proxy_map_.end()) {
    return nullptr;
  }
  return it->second->getTransport();
}

char *ThreadManager::get_buffer() { return buffer_.get(); }

int ThreadManager::get_buffer_len() { return BUFFER_SIZE; }

service::ServiceBox *ThreadManager::get_box() { return box_; }

LuaServiceImpl *ThreadManager::get_lua_service() { return lua_service_ptr_; }

ProxyCallLua::ProxyCallLua(rpc::Rpc *rpc, rpc::TransportPtr transport,
                           rpc::ProxyID proxy_id, ThreadManager *thread_manager,
                           int timeout) {
  rpc_ = rpc;
  transport_ = transport;
  proxy_id_ = proxy_id;
  thread_manager_ = thread_manager;
  setTimeout(timeout);
}

ProxyCallLua::~ProxyCallLua() {}

void ProxyCallLua::doRet(int bytes) {
  if (!transport_) {
    thread_manager_->write_fail_log("Transport not found");
    //
    // 销毁ProxyCall实例
    //
    thread_manager_->remove_proxy_call(getCallID());
    return;
  }
  auto *buffer = thread_manager_->get_buffer();
  if (bytes && (bytes != transport_->recv(buffer, bytes))) {
    thread_manager_->write_fail_log("Transport receiving failed");
    //
    // 销毁ProxyCall实例
    //
    thread_manager_->remove_proxy_call(getCallID());
    return;
  }
  auto proxy_ptr = thread_manager_->get_proxy(getProxyID());
  if (proxy_ptr) {
    thread_manager_->proxy_call_return_to_lua(this, proxy_ptr->getTarget(),
                                              getCallID(), buffer, bytes);
  } else {
    // Proxy已经销毁但是还有未完成的ProxyCall到来
    thread_manager_->proxy_call_return_to_lua(nullptr, 0, getCallID(), buffer,
                                              bytes);
  }
  // 销毁ProxyCall实例
  thread_manager_->remove_proxy_call(getCallID());
}

rpc::ProxyID ProxyCallLua::getProxyID() { return proxy_id_; }

ProxyLua::ProxyLua(rpc::Rpc *rpc_ptr, rpc::ProxyID proxy_id,
                   rpc::ServiceUUID service_uuid, MsgFactory *message_factory,
                   ThreadManager *thread_manager_ptr) {
  service_uuid_ = service_uuid;
  msg_factory_ = message_factory;
  thread_manager_ptr_ = thread_manager_ptr;
  setRpc(rpc_ptr);
  setID(proxy_id);
  getSubscriber()->open_coro_mode(false);
}

ProxyLua::~ProxyLua() {
  thread_manager_ptr_->get_lua_service()->remove_all_sub_event(getID());
}

const char *ProxyLua::getSignature(rpc::MethodID methodID) const {
  const auto *method_call_type =
      msg_factory_->get_type(service_uuid_, methodID);
  return method_call_type->method_name.c_str();
}

rpc::ServiceUUID ProxyLua::getServiceUUID() { return service_uuid_; }

bool ProxyLua::isOneway(rpc::MethodID methodID) {
  const auto *method_call_type =
      msg_factory_->get_type(service_uuid_, methodID);
  return method_call_type->oneway;
}

void ProxyLua::on_event(const std::string &evt_name, const char *data,
                        std::size_t length) {
  std::string str(data, length);
  thread_manager_ptr_->get_lua_service()->call_sub_event_cb(getID(), evt_name,
                                                            str);
}

LuaServiceImpl::LuaServiceImpl(service::ServiceBox *box) { box_ = box; }

LuaServiceImpl::~LuaServiceImpl() {}

auto LuaServiceImpl::start_internal() -> bool {
  msg_factory_ = make_unique_pool_ptr<MsgFactory>(box_);
  if (!msg_factory_->load(idl_json_file_path_, idl_proto_root_dir_)) {
    return false;
  }
  L_ = lua_newstate(lua_mem_alloc, nullptr);
  if (!L_) {
    return false;
  }
  luaL_openlibs(L_);
  LuaUtil::lua_push(L_, (void *)this);
  lua_setfield(L_, LUA_REGISTRYINDEX, "service");
  thread_manager_ =
      make_unique_pool_ptr<ThreadManager>(box_, L_, msg_factory_.get(), this);
  if (!thread_manager_) {
    return false;
  }
  timer_wheel_ = make_unique_pool_ptr<util::TimerWheel>(box_);
  // 注册全局函数
  install_global_function();
  // 安装热更新
  if (!install_hotfix()) {
    // 安装热加载失败
    return false;
  }
  if (!install_context()) {
    // 安装模块失败
    return false;
  }
  if (!install_timer_table()) {
    // 安装定时器表失败
    return false;
  }
  if (!install_sub_pub_table()) {
    // 安装发布订阅失败
    return false;
  }
  if (!install_proxy(proxy_root_dir_)) {
    // 安装lua代理框架失败
    return false;
  }
  if (!install_stub(stub_root_dir_)) {
    // 安装lua stub框架失败
    return false;
  }
  if (!install_modules()) {
    // 安装系统模块失败
    return false;
  }
  log_history_ = kratos::make_unique_pool_ptr<LogHistoryImpl>();
  return true;
}

auto LuaServiceImpl::setup_path(const std::string &service_name) -> bool {
  if (box_ && box_->get_config().has_attribute("lua.root_dir")) {
    lua_root_dir_ = box_->get_config().get_string("lua.root_dir");
  }
  if (box_ && box_->get_config().has_attribute("lua.hotfix_check_second")) {
    hotfix_check_second_ =
        box_->get_config().get_number<std::time_t>("lua.hotfix_check_second");
  }
  namespace fs = std::filesystem;
  // 配置根目录为 lua_root_dir_/service_name, 与服务容器在同一个目录
  std::string config_root_dir =
      (fs::path(lua_root_dir_) / fs::path(service_name)).string();
  // IDL对应的JSON文件
  idl_json_file_path_ = (fs::path(config_root_dir) / fs::path("json")).string();
  // .proto文件目录
  idl_proto_root_dir_ =
      (fs::path(config_root_dir) / fs::path("proto")).string();
  // Lua文件存放的根目录
  std::string lua_root_dir =
      (fs::path(config_root_dir) / fs::path("script")).string();
  // Lua入口主文件
  lua_root_file_ = (fs::path(lua_root_dir) / fs::path("main.lua")).string();
  // proxy根目录
  proxy_root_dir_ = (fs::path(lua_root_dir_) / fs::path("lua_proxy")).string();
  // stub根目录
  stub_root_dir_ = (fs::path(config_root_dir) / fs::path("stub")).string();
  // 源代码目录
  source_path_ = lua_root_dir;
  return true;
}

auto LuaServiceImpl::do_hotfix() -> void {
  std::string file;
  while (hotfix_queue_.try_dequeue(file)) {
    hotfix_file(file);
  }
}

auto LuaServiceImpl::is_open_hotfix() -> bool {
  if (!box_ || !box_->get_config().has_attribute("lua.open_hotfix")) {
    return false;
  }
  try {
    return box_->get_config().get_bool("lua.open_hotfix");
  } catch (...) {
    return false;
  }
  return false;
}

auto LuaServiceImpl::start(const std::string &service_name,
                           rpc::ServiceUUID uuid, rpc::Publisher *publisher_ptr)
    -> bool {
  if (hotfix_checker_.joinable()) {
    checker_running_ = false;
    hotfix_checker_.join();
  }
  service_name_ = service_name;
  service_uuid_ = uuid;
  publisher_ptr_ = publisher_ptr;
  if (!setup_path(service_name)) {
    return false;
  }
  if (!start_internal()) {
    return false;
  }
  if (box_) {
    box_->write_log_line(klogger::Logger::INFORMATION,
                         "[lua][" + service_name_ + "]load main file[" +
                             lua_root_file_ + "]...");
  }
  // 加载并运行入口脚本
  auto error = luaL_dofile(L_, lua_root_file_.c_str());
  if (error) {
    write_fatal_log(lua_tostring(L_, -1));
    return false;
  } else {
    if (box_) {
      box_->write_log_line(klogger::Logger::INFORMATION,
                           "[lua][" + service_name_ + "]load main file[" +
                               lua_root_file_ + "]...done");
    }
  }
  //
  // 开启热更新检测线程
  //
  if (is_open_hotfix()) {
    checker_running_ = true;
    hotfix_checker_ =
        std::thread(std::bind(&LuaServiceImpl::hotfix_checker_main, this));
  }
  lua_on_event_func_name_ = "_" + std::to_string(service_uuid_) + "_on_event";
  lua_on_cancel_func_name_ = "_" + std::to_string(service_uuid_) + "_on_cancel";
  // 调用服务初始化方法
  return call_after_fork();
}

auto LuaServiceImpl::stop() -> bool {
  checker_running_ = false;
  if (hotfix_checker_.joinable()) {
    hotfix_checker_.join();
  }
  std::string error;
  auto run_ret =
      LuaUtil::call_lua_function_no_ret(before_destroy_func_name_, L_, error);
  if (!run_ret) {
    write_fail_log(
        "Call lua service method failed, method 'on_before_destroy':" + error);
  }
  close_debugger();
  thread_manager_.reset();
  timer_wheel_.reset();
  module_cleanup();
  close_timer_ref();
  close_sub_pub_ref();
  log_history_.reset();
  // 最后销毁, 防止协程有引用
  if (L_) {
    lua_close(L_);
    L_ = nullptr;
  }
  return true;
}

auto LuaServiceImpl::update(std::time_t tick) -> void {
  if (lua_debug_) {
    lua_debug_->update(tick);
    // 调试器挂起了虚拟机
    if (lua_debug_->is_suspend()) {
      return;
    }
  }
  timer_wheel_->update(tick);
  run_tick_once(tick);
  module_update(tick);
  do_hotfix();
  //
  // 每秒检测Lua虚拟机内存使用量
  //
  if (!last_mem_tick_) {
    last_mem_tick_ = tick;
  }
  if (tick - last_mem_tick_ > 1000) {
    auto cur_mem_size =
        (lua_gc(L_, LUA_GCCOUNT, 0) << 10) + lua_gc(L_, LUA_GCCOUNTB, 0);
    if (!last_mem_size_) {
      last_mem_size_ = cur_mem_size;
    }
    if (cur_mem_size > last_mem_size_) {
      increase_trend_count_ += 1;
    } else {
      increase_trend_count_ -= 1;
    }
    last_mem_size_ = cur_mem_size;
    last_mem_tick_ = tick;
    //
    // 持续上涨则打印日志
    //
    // if (increase_trend_count_ > 100) {
    //  if (box_) {
    //    box_->write_log_line(klogger::Logger::WARNING,
    //                         "[lua][" + service_name_ + "]Lua memory in use["
    //                         +
    //                             util::readable_size(cur_mem_size) + "]");
    //  }
    //}
  }
}

auto LuaServiceImpl::call(rpc::StubCallPtr stub_call) -> void {
  if (stub_call->getServiceUUID() != service_uuid_) {
    return;
  }
  if (thread_manager_) {
    int size = 0;
    auto *data = stub_call->getData(size);
    auto ret = thread_manager_->call_lua_service_method(
        stub_call, stub_call->getServiceUUID(), stub_call->getMethodID(),
        stub_call->getCallID(), stub_call->getServiceID(),
        stub_call->getTransport(), data, size);
    if (!ret) {
      write_fail_log("Service[" + std::to_string(stub_call->getServiceUUID()) +
                     "], Method[" + std::to_string(stub_call->getMethodID()) +
                     "] not found");
      return;
    }
  }
}

auto LuaServiceImpl::on_subscribe(const rpc::SubID &sub_id,
                                  const std::string &evt_name, const char *data,
                                  std::size_t length) -> void {
  std::string error;
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  std::string str(data, length);
  auto run_ret = LuaUtil::call_lua_function_no_ret(
      lua_on_event_func_name_, thread_ptr->get_lua_state(), error, sub_id,
      evt_name, str);
  if (thread_ptr->is_yield()) {
    add_yield_thread(thread_ptr);
    return;
  }
  if (!run_ret) {
    thread_manager_->remove(thread_ptr);
    write_fatal_log("Call lua service method 'on_event' failed:" + error);
    return;
  }
  thread_manager_->remove(thread_ptr);
}

auto LuaServiceImpl::on_cancel(const rpc::SubID &sub_id) -> void {
  std::string error;
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  auto run_ret = LuaUtil::call_lua_function_no_ret(
      lua_on_cancel_func_name_, thread_ptr->get_lua_state(), error, sub_id);
  if (thread_ptr->is_yield()) {
    add_yield_thread(thread_ptr);
    return;
  }
  if (!run_ret) {
    thread_manager_->remove(thread_ptr);
    write_fatal_log("Call lua service method 'on_cancel' failed:" + error);
    return;
  }
  thread_manager_->remove(thread_ptr);
}

auto LuaServiceImpl::hotfix_file(const std::string &file_path) -> bool {
  int ret_code = 0;
  std::string error;
  auto result =
      LuaUtil::call_lua_function("hotfix_file", L_, ret_code, error, file_path);
  if (!result) {
    write_fail_log("Hotfix failed, file[" + file_path + "], error:" + error);
    return false;
  }
  if (ret_code) {
    return false;
  }
  return true;
}

auto LuaServiceImpl::hotfix_chunk(const std::string &chunk) -> bool {
  int ret_code = 0;
  std::string error;
  auto result = LuaUtil::call_lua_function("hotfix", L_, ret_code, error, chunk,
                                           "LuaService");
  if (!result) {
    write_fail_log("Hotfix failed, chunk[" + chunk + "], error:" + error);
    return false;
  }
  if (ret_code) {
    return false;
  }
  return true;
}

auto LuaServiceImpl::is_last_call_yield() -> bool {
  auto current = thread_manager_->get_current_thread();
  if (current) {
    return current->is_yield();
  }
  return false;
}

auto LuaServiceImpl::open_debugger(const std::string &name) -> void {
  if (lua_debug_) {
    return;
  }
  lua_debug_ = kratos::make_shared_pool_ptr<Debugger>(L_, name);
  lua_debug_->set_source_root(source_path_);
  if (box_) {
    debugger_name_ = name;
    box_->get_debug_server()->enable_machine(name, this, lua_debug_.get());
  }
  thread_manager_->set_debugger(lua_debug_.get());
}

auto LuaServiceImpl::close_debugger() -> void {
  if (box_ && box_->get_debug_server()) {
    box_->get_debug_server()->disable_machine(debugger_name_);
    debugger_name_ = "";
  }
  if (lua_debug_) {
    lua_debug_.reset();
  }
  thread_manager_->set_debugger(nullptr);
}

auto LuaServiceImpl::disable_debugger() -> void {
  if (box_ && box_->get_debug_server() && lua_debug_) {
    box_->get_debug_server()->disable_machine(debugger_name_);
  }
}

auto LuaServiceImpl::enable_debugger() -> void {
  if (box_ && box_->get_debug_server() && lua_debug_) {
    box_->get_debug_server()->enable_machine(debugger_name_, this,
                                             lua_debug_.get());
  }
}

auto LuaServiceImpl::reload() -> bool {
  if (lua_debug_) {
    lua_debug_->disable();
  }
  // 关闭
  if (!stop()) {
    return false;
  }
  // 重启
  return start(service_name_, service_uuid_, publisher_ptr_);
}

auto LuaServiceImpl::restart() -> bool {
  // 拷贝名字
  auto debugger_name = lua_debug_->get_name();
  auto breakpoints = lua_debug_->get_all_breakpoint();
  if (!stop()) {
    return false;
  }
  if (!start_internal()) {
    return false;
  }
  open_debugger(debugger_name);
  if (!lua_debug_) {
    return false;
  }
  lua_debug_->add_breakpoint(breakpoints, true);
  // 加载并运行入口脚本
  auto error = luaL_dofile(L_, lua_root_file_.c_str());
  if (error) {
    std::string error_string;
    if (!LuaUtil::catch_lua_error(L_, lua_status(L_), error_string)) {
      write_fatal_log(error_string);
      return false;
    }
  }
  // 调用服务初始化方法
  return call_after_fork();
}

auto LuaServiceImpl::get_thread_info() -> std::string {
  std::string content;
  for (const auto &[k, v] : thread_manager_->get_all_thread()) {
    content += std::to_string(k) + " " +
               std::to_string((std::uint64_t)v->get_lua_state());
    if (v->is_yield()) {
      content += " suspend\n";
    } else if (v->is_dead()) {
      content += " dead\n";
    } else if (v->is_ready()) {
      content += " running\n";
    }
  }
  return content;
}

auto LuaServiceImpl::get_log_history() -> kratos::service::LogHistory * {
  return log_history_.get();
}

auto LuaServiceImpl::write_log(int level, const char *log) -> void {
  if (!box_) {
    return;
  }
  box_->get_logger_appender()->write(level, "%s", log);
}

auto LuaServiceImpl::get_last_error() -> const std::string & {
  // 获取当前协程
  auto coroutine_ptr = thread_manager_->get_current_thread();
  if (!coroutine_ptr) {
    static std::string NullString;
    return NullString;
  }
  return coroutine_ptr->get_last_error();
}

auto LuaServiceImpl::module_update(std::time_t ms) -> void {
  lua_argument_->update(ms);
  lua_config_->update(ms);
  lua_redis_->update(ms);
  lua_time_->update(ms);
  lua_http_->update(ms);
  lua_console_->update(ms);
}

auto LuaServiceImpl::module_cleanup() -> void {
  lua_argument_->do_cleanup();
  lua_config_->do_cleanup();
  lua_redis_->do_cleanup();
  lua_time_->do_cleanup();
  lua_http_->do_cleanup();
  lua_console_->do_cleanup();
}

auto LuaServiceImpl::close_timer_ref() -> void {
  if (timer_ref_key_ != LUA_NOREF) {
    LuaUtil::remove_registry_key(L_, timer_ref_key_);
  }
}

auto LuaServiceImpl::close_sub_pub_ref() -> void {
  if (sub_pub_ref_key_ != LUA_NOREF) {
    LuaUtil::remove_registry_key(L_, sub_pub_ref_key_);
  }
}

void LuaServiceImpl::run_tick_once(std::time_t now) {
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  thread_manager_->set_current_thread_id(thread_ptr->get_id());
  if (thread_ptr->call_no_ret(tick_func_name_, now)) {
    if (thread_ptr->is_yield()) {
      add_yield_thread(thread_ptr);
    } else {
      thread_manager_->remove(thread_ptr);
    }
  } else {
    thread_manager_->remove(thread_ptr);
  }
}

void LuaServiceImpl::install_global_function() {
  // 用于lua框架内部调用
  LuaUtil::register_function(L_, "kratos_try_get_proxy",
                             &LuaServiceImpl::lua_get_proxy_timeout);
  LuaUtil::register_function(L_, "kratos_get_proxy_from_peer",
                             &LuaServiceImpl::lua_get_proxy_from_peer);
  LuaUtil::register_function(L_, "kratos_get_proxy_from_transport",
                             &LuaServiceImpl::lua_get_proxy_from_transport);
  LuaUtil::register_function(L_, "kratos_call_proxy",
                             &LuaServiceImpl::lua_call_proxy_method);
  LuaUtil::register_function(L_, "kratos_register_service",
                             &LuaServiceImpl::lua_register_service);
  LuaUtil::register_function(L_, "kratos_unregister_service",
                             &LuaServiceImpl::lua_unregister_service);
  LuaUtil::register_function(L_, "kratos_start_timer",
                             &LuaServiceImpl::lua_start_timer);
  LuaUtil::register_function(L_, "kratos_start_periodic_timer",
                             &LuaServiceImpl::lua_start_periodic_timer);
  LuaUtil::register_function(L_, "kratos_cancel_timer",
                             &LuaServiceImpl::lua_cancel_timer);
  LuaUtil::register_function(L_, "kratos_shutdown",
                             &LuaServiceImpl::lua_shutdown);
  LuaUtil::register_function(L_, "kratos_log", &LuaServiceImpl::lua_log);
  LuaUtil::register_function(L_, "kratos_remove_proxy",
                             &LuaServiceImpl::lua_remove_proxy);
  LuaUtil::register_function(L_, "__kratos_sleep", &LuaServiceImpl::lua_sleep);
  LuaUtil::register_function(L_, "__kratos_current_thread_id",
                             &LuaServiceImpl::lua_current_thread_id);
  LuaUtil::register_function(L_, "kratos_subscribe",
                             &LuaServiceImpl::lua_subscribe);
  LuaUtil::register_function(L_, "kratos_cancel", &LuaServiceImpl::lua_cancel);
  LuaUtil::register_function(L_, "kratos_publish",
                             &LuaServiceImpl::lua_publish);
}

bool LuaServiceImpl::call_after_fork() {
  bool call_ret = false;
  std::string error;
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  thread_manager_->set_current_thread_id(thread_ptr->get_id());
  auto run_ret = LuaUtil::call_lua_function(
      after_fork_func_name_, thread_ptr->get_lua_state(), call_ret, error);
  if (thread_ptr->is_yield()) {
    add_yield_thread(thread_ptr);
    return true;
  }
  if (!call_ret || !run_ret) {
    thread_manager_->remove(thread_ptr);
    write_fatal_log("Call lua service method 'on_after_fork' failed:" + error);
    return false;
  }
  thread_manager_->remove(thread_ptr);
  return true;
}

service::ServiceBox *LuaServiceImpl::get_box() { return box_; }

auto LuaServiceImpl::write_fail_log(const std::string &error_string) -> void {
  if (box_) {
    box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FAILURE,
                    ("[" + service_name_ + "]" + error_string).c_str());
  }
}

auto LuaServiceImpl::write_fatal_log(const std::string &error_string) -> void {
  if (box_) {
    box_->write_log(lang::LangID::LANG_LUA_ERROR, Logger::FATAL,
                    ("[" + service_name_ + "]" + error_string).c_str());
  }
}

auto LuaServiceImpl::install_hotfix() -> bool {
  auto error = luaL_dostring(L_, get_hotfix_lua_string());
  if (error) {
    std::string error_string;
    if (!LuaUtil::catch_lua_error(L_, lua_status(L_), error_string)) {
      write_fatal_log(error_string);
      return false;
    }
  }
  return true;
}

auto LuaServiceImpl::install_context() -> bool {
  auto error = luaL_dostring(L_, get_module_lua_string());
  if (error) {
    std::string error_string;
    if (!LuaUtil::catch_lua_error(L_, lua_status(L_), error_string)) {
      write_fatal_log(error_string);
      return false;
    }
  }
  return true;
}

auto LuaServiceImpl::install_proxy(const std::string &proxy_root) -> bool {
  std::vector<std::string> lua_files;
  util::get_file_in_directory(proxy_root, ".lua", lua_files);
  for (const auto &file : lua_files) {
    auto error = luaL_dofile(L_, file.c_str());
    if (error) {
      std::string error_string;
      if (!LuaUtil::catch_lua_error(L_, lua_status(L_), error_string)) {
        write_fatal_log(error_string);
        return false;
      }
    }
  }
  return true;
}

auto LuaServiceImpl::install_timer_table() -> bool {
  timer_ref_key_ = LuaUtil::new_registry_table(L_);
  return true;
}

auto LuaServiceImpl::install_sub_pub_table() -> bool {
  sub_pub_ref_key_ = LuaUtil::new_registry_table(L_);
  return true;
}

/**
 * @brief 建立并初始化一个模块
 * @tparam T 模块实现类
 * @param box 服务容器
 * @param L lua虚拟机
 * @param service lua服务
 * @return 模块实例
 */
template <typename T>
kratos::unique_pool_ptr<T> create_module(kratos::service::ServiceBox *box,
                                         lua_State *L,
                                         LuaServiceImpl *service) {
  auto ptr = kratos::make_unique_pool_ptr<T>(box, L, service);
  if (!ptr->do_register()) {
    return nullptr;
  }
  return ptr;
}

auto LuaServiceImpl::install_modules() -> bool {
  lua_argument_ = create_module<LuaArgument>(box_, L_, this);
  lua_config_ = create_module<LuaConfig>(box_, L_, this);
  lua_redis_ = create_module<LuaRedis>(box_, L_, this);
  lua_time_ = create_module<LuaTime>(box_, L_, this);
  lua_http_ = create_module<LuaHttp>(box_, L_, this);
  lua_console_ = create_module<LuaConsole>(box_, L_, this);
  return true;
}

auto LuaServiceImpl::add_timer_lua_func(lua_State *vm, util::TimerID timer_id)
    -> bool {
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, timer_ref_key_);
  if (!ret) {
    // 建立失败
    return false;
  }
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  lua_pushvalue(vm, -3);
  lua_settable(vm, -3);
  lua_pop(vm, 1);
  return true;
}

auto LuaServiceImpl::add_sub_pub_lua_func(lua_State *vm,
                                          const std::string evt_name,
                                          rpc::ProxyID proxy_id,
                                          const std::string &data) -> bool {
  auto it = proxy_sub_info_map_.find(proxy_id);
  if (it != proxy_sub_info_map_.end()) {
    for (const auto &sub_info : it->second) {
      // 检查重名
      if (sub_info.evt_name == evt_name) {
        write_fail_log("Event name duplicated[" + evt_name + "]");
        return false;
      }
    }
  }
  if (!lua_isfunction(vm, -1)) {
    return false;
  }
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, sub_pub_ref_key_);
  if (!ret) {
    // 建立失败
    write_fail_log("'lua_rawgeti' failed");
    return false;
  }
  auto sub_pb_cb_id = pub_sub_cb_id_++;
  LuaUtil::lua_push(vm, sub_pb_cb_id);
  lua_pushvalue(vm, -3);
  lua_settable(vm, -3);
  lua_pop(vm, 1);
  // 记录事件名与回调ID的对应关系
  proxy_sub_info_map_[proxy_id].emplace_back(
      SubInfo{sub_pb_cb_id, evt_name, proxy_id});
  auto *rpc_ptr = box_->get_rpc();
  auto prx_ptr = rpc_ptr->getProxyManager()->get(proxy_id);
  if (!prx_ptr) {
    write_fail_log("Proxy not found to subscribe event[" + evt_name + "]");
    return false;
  }
  if (!prx_ptr->getSubscriber()->subscribe(evt_name, data.c_str(),
                                           data.size())) {
    return false;
  }
  return true;
}

auto LuaServiceImpl::remove_time_lua_func(lua_State *vm, util::TimerID timer_id)
    -> void {
  lua_rawgeti(vm, LUA_REGISTRYINDEX, timer_ref_key_);
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  lua_pushnil(vm);
  lua_settable(vm, -3);
  lua_pop(vm, 1);
}

auto LuaServiceImpl::remove_sub_pub_lua_func(lua_State *vm,
                                             const std::string &evt_name,
                                             rpc::ProxyID proxy_id) -> void {
  auto it = proxy_sub_info_map_.find(proxy_id);
  if (it == proxy_sub_info_map_.end()) {
    return;
  }
  SubInfo sub_info;
  for (auto sub_info_it = it->second.begin(); sub_info_it != it->second.end();
       sub_info_it++) {
    if (sub_info_it->evt_name == evt_name) {
      sub_info = *sub_info_it;
      it->second.erase(sub_info_it);
      break;
    }
  }
  if (it->second.empty()) {
    proxy_sub_info_map_.erase(it);
  }
  if (!sub_info.cb_id) {
    return;
  }
  lua_rawgeti(vm, LUA_REGISTRYINDEX, sub_pub_ref_key_);
  LuaUtil::lua_push(vm, sub_info.cb_id);
  lua_pushnil(vm);
  lua_settable(vm, -3);
  auto prx_ptr = box_->get_rpc()->getProxyManager()->get(sub_info.proxy_id);
  if (prx_ptr) {
    prx_ptr->getSubscriber()->cancel(evt_name);
  }
  lua_pop(vm, 1);
}

auto LuaServiceImpl::thread_call(int reg_key, const std::string &name) -> void {
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  auto *vm = thread_ptr->get_lua_state();
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, reg_key);
  if (!ret) {
    // 获取表失败
    thread_manager_->remove(thread_ptr);
    return;
  }
  LuaUtil::lua_push(vm, name);
  lua_gettable(vm, -2);
  LuaUtil::lua_push(vm, name);
  if (!thread_ptr->resume(1)) {
    thread_manager_->remove(thread_ptr);
    return;
  }
  if (thread_ptr->is_yield()) {
    // 处理函数内出让
    add_yield_thread(thread_ptr);
  } else {
    // 处理完成
    thread_manager_->remove(thread_ptr);
  }
  lua_pop(vm, 1);
}

auto LuaServiceImpl::get_thread_manager() -> ThreadManager * {
  return thread_manager_.get();
}

auto LuaServiceImpl::timer_func(util::TimerID timer_id,
                                std::uint64_t /*user_data*/) -> bool {
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  auto *vm = thread_ptr->get_lua_state();
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, timer_ref_key_);
  if (!ret) {
    // 获取表失败
    thread_manager_->remove(thread_ptr);
    return 0;
  }
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  lua_gettable(vm, -2);
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  if (!thread_ptr->resume(1)) {
    thread_manager_->remove(thread_ptr);
    remove_time_lua_func(L_, timer_id);
    return false;
  }
  if (thread_ptr->is_yield()) {
    // 处理函数内出让
    add_yield_thread(thread_ptr);
  } else {
    // 处理完成
    thread_manager_->remove(thread_ptr);
  }
  lua_pop(vm, 1);
  remove_time_lua_func(L_, timer_id);
  return true;
}

auto LuaServiceImpl::periodic_timer_func(util::TimerID timer_id,
                                         std::uint64_t /*user_data*/) -> bool {
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  auto *vm = thread_ptr->get_lua_state();
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, timer_ref_key_);
  if (!ret) {
    // 获取表失败
    thread_manager_->remove(thread_ptr);
    return 0;
  }
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  lua_gettable(vm, -2);
  LuaUtil::lua_push(vm, (std::uint64_t)timer_id);
  if (!thread_ptr->resume(1)) {
    thread_manager_->remove(thread_ptr);
    remove_time_lua_func(L_, timer_id);
    return false;
  }
  if (thread_ptr->is_yield()) {
    // 处理函数内出让
    add_yield_thread(thread_ptr);
  } else {
    // 处理完成
    thread_manager_->remove(thread_ptr);
  }
  return true;
}

auto LuaServiceImpl::wakeup_func(util::TimerID /*timer_id*/,
                                 std::uint64_t user_data) -> bool {
  // 获取需要唤醒的协程
  auto thread_ptr = thread_manager_->get((ThreadID)user_data);
  if (thread_ptr) {
    // 唤醒
    if (!thread_ptr->resume(0)) {
      return false;
    }
    if (thread_ptr->is_yield()) {
      add_yield_thread(thread_ptr);
    }
  }
  return true;
}

auto LuaServiceImpl::query_proxy_timer_func(util::TimerID /*timer_id*/,
                                            std::uint64_t user_data) -> bool {
  auto thread_ptr = thread_manager_->get((ThreadID)user_data);
  if (!thread_ptr) {
    return true;
  }
  if (!thread_ptr->resume(0)) {
    return false;
  }
  if (thread_ptr->is_yield()) {
    add_yield_thread(thread_ptr);
  }
  return true;
}

auto LuaServiceImpl::publish(const std::string &evt_name,
                             const std::string &data) -> void {
  if (publisher_ptr_) {
    publisher_ptr_->publish(evt_name, data.c_str(), data.size());
  }
}

auto LuaServiceImpl::add_yield_thread(ThreadPtr thread_ptr) -> bool {
  timer_wheel_->schedule(
      [&](util::TimerID /*timer_id*/, std::uint64_t user_data) -> bool {
        auto thread_id = (ThreadID)user_data;
        auto yield_thread = thread_manager_->get(thread_id);
        if (!yield_thread) {
          return false;
        }
        // 协程未被释放同时还处于ThreadState::READY状态或ThreadState::DEAD状态
        // 需要定时器释放
        if (yield_thread->is_ready() || yield_thread->is_dead()) {
          thread_manager_->remove(yield_thread);
          return false;
        }
        return true;
      },
      CHECK_YIELD_THREAD_INTVAL, thread_ptr->get_id());
  return true;
}

auto LuaServiceImpl::call_sub_event_cb(rpc::ProxyID proxy_id,
                                       const std::string &evt_name,
                                       const std::string &data) -> void {
  auto it = proxy_sub_info_map_.find(proxy_id);
  if (it == proxy_sub_info_map_.end()) {
    return;
  }
  int cb_id{0};
  for (const auto& sub_info : it->second) {
    if (sub_info.evt_name == evt_name) {
      cb_id = sub_info.cb_id;
      break;
    }
  }
  if (!cb_id) {
    // 回调ID未找到
    return;
  }
  auto thread_ptr = thread_manager_->new_lua_thread(0);
  auto *vm = thread_ptr->get_lua_state();
  auto ret = lua_rawgeti(vm, LUA_REGISTRYINDEX, sub_pub_ref_key_);
  if (!ret) {
    // 获取表失败
    thread_manager_->remove(thread_ptr);
    write_fail_log("'lua_rawgeti' failed");
    return;
  }
  //
  // 获取回调函数并压栈
  //
  LuaUtil::lua_push(vm, cb_id);
  // 获取回到函数并压栈
  lua_gettable(vm, -2);
  // 压入参数, 事件名, 事件附带数据
  LuaUtil::lua_push(vm, evt_name);
  LuaUtil::lua_push(vm, data);
  // 调用回调
  if (!thread_ptr->resume(2)) {
    thread_manager_->remove(thread_ptr);
    return;
  }
  if (thread_ptr->is_yield()) {
    // 处理函数内出让
    add_yield_thread(thread_ptr);
  } else {
    // 处理完成
    thread_manager_->remove(thread_ptr);
  }
}

auto LuaServiceImpl::remove_all_sub_event(rpc::ProxyID proxy_id) -> void {
  auto it = proxy_sub_info_map_.find(proxy_id);
  if (it == proxy_sub_info_map_.end()) {
    return;
  }
  SubInfoList sub_info_list(it->second);
  for (const auto &info : sub_info_list) {
    remove_sub_pub_lua_func(L_, info.evt_name, info.proxy_id);
  }
}

auto LuaServiceImpl::install_stub(const std::string &stub_root) -> bool {
  std::vector<std::string> lua_files;
  util::get_file_in_directory(stub_root, ".lua", lua_files);
  for (const auto &file : lua_files) {
    auto error = luaL_dofile(L_, file.c_str());
    std::string error_string;
    if (!LuaUtil::catch_lua_error(L_, error, error_string)) {
      write_fatal_log(error_string);
      return false;
    }
  }
  return true;
}

auto LuaServiceImpl::hotfix_checker_main() -> void {
  //
  // 获取所有lua文件
  //
  std::vector<std::string> files;
  std::string lua_root_dir = lua_root_dir_;
  util::get_file_in_directory_recursive(lua_root_dir, ".lua", files);
  using ModMap = service::PoolUnorederedMap<std::string, std::time_t>;
  ModMap mod_map;
  for (const auto &file : files) {
    mod_map.emplace(file, util::get_last_modify_time(file));
  }
  while (checker_running_) {
    files.clear();
    util::get_file_in_directory_recursive(lua_root_dir, ".lua", files);
    for (const auto &file : files) {
      auto mod_time = util::get_last_modify_time(file);
      auto it = mod_map.find(file);
      if (it == mod_map.end()) {
        //
        // 只检测旧文件, 新增文件忽略
        //
        continue;
      } else {
        if (it->second != mod_time) {
          it->second = mod_time;
          //
          // 通知主线程文件变化通知
          //
          hotfix_queue_.enqueue(file);
        }
      }
    }
    //
    // 睡眠指定间隔
    //
    std::this_thread::sleep_for(std::chrono::seconds(hotfix_check_second_));
  }
}

LuaServiceImpl *LuaServiceImpl::get_lua_service(lua_State *l) {
  LuaUtil::get_registry_value(l, "service");
  return LuaUtil::pop_ptr_value<LuaServiceImpl *>(l);
}

int LuaServiceImpl::lua_register_service(lua_State *l) {
  LuaUtil::BoolPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service || !lua_isstring(l, -1)) {
    return pusher.return_value();
  }
  auto service_name = LuaUtil::pop_value<std::string>(l);
  if (service->get_box()->register_service(service_name)) {
    return pusher.return_value(true);
  } else {
    return pusher.return_value(false);
  }
}

int LuaServiceImpl::lua_unregister_service(lua_State *l) {
  LuaUtil::BoolPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service || !lua_isstring(l, -1)) {
    return pusher.return_value();
  }
  auto service_name = LuaUtil::pop_value<std::string>(l);
  if (service->get_box()->unregister_service(service_name)) {
    return pusher.return_value(true);
  } else {
    return pusher.return_value(false);
  }
}

int LuaServiceImpl::lua_call_proxy_method(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!lua_istable(l, -1) || !lua_isinteger(l, -2) || !lua_isinteger(l, -3) ||
      !lua_isinteger(l, -4)) {
    service->write_fail_log("Invalid argument");
    return 0;
  }
  auto proxy_id = (rpc::ProxyID)lua_tointeger(l, -2);
  auto method_id = (rpc::MethodID)lua_tointeger(l, -3);
  auto service_id = (rpc::ServiceID)lua_tointeger(l, -4);

  auto it = service->proxy_uuid_map_.find(proxy_id);
  if (it == service->proxy_uuid_map_.end()) {
    service->write_fail_log("Proxy not found, proxy ID[" +
                            std::to_string(proxy_id) + "]");
    return 0;
  }
  auto service_uuid = it->second;
  // 当前参数栈顶为调用参数
  return service->thread_manager_->lua_call_proxy_method(
      service_uuid, service_id, proxy_id, method_id);
}

int LuaServiceImpl::lua_subscribe(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_value(false);
  }
  if (!service || !lua_isfunction(l, -1) || !lua_isstring(l, -2) ||
      !lua_isstring(l, -3) || !lua_isinteger(l, -4)) {
    service->write_fail_log("[lua]Invalid argument for ctx:kratos_subscribe");
    return pusher.return_value(false);
  }
  auto proxy_id = (rpc::ProxyID)lua_tointeger(l, -4);
  const auto *evt_name = lua_tostring(l, -3);
  auto data = std::string(lua_tostring(l, -2));
  if (!service->add_sub_pub_lua_func(l, evt_name, proxy_id, data)) {
    return pusher.return_value(false);
  }
  return pusher.return_value(true);
}

int LuaServiceImpl::lua_cancel(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!service || !lua_isstring(l, -1) || !lua_isinteger(l, -2)) {
    service->write_fail_log("[lua]Invalid argument for ctx:kratos_cancel");
    return 0;
  }
  const auto *evt_name = lua_tostring(l, -1);
  auto proxy_id = (rpc::ProxyID)lua_tointeger(l, -2);
  service->remove_sub_pub_lua_func(l, evt_name, proxy_id);
  return 0;
}

int LuaServiceImpl::lua_publish(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!service || !lua_isstring(l, -1) || !lua_isstring(l, -2)) {
    service->write_fail_log("[lua]Invalid argument for ctx:kratos_publish");
    return 0;
  }
  const auto *evt_name = lua_tostring(l, -2);
  const auto *data = lua_tostring(l, -1);
  service->publish(evt_name, data);
  return 0;
}

int LuaServiceImpl::lua_start_timer(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service || !lua_isfunction(l, -1) || !lua_isinteger(l, -2)) {
    service->write_fail_log("[lua]Invalid argument for ctx:expire_at");
    return pusher.return_value();
  }
  // 定时器间隔
  auto intval = (int)lua_tointeger(l, -2);
  if (intval <= 0) {
    service->write_fail_log("[lua]Invalid argument for ctx:expire_at");
    return pusher.return_value();
  }
  auto timer_id = service->timer_wheel_->schedule_once(
      std::bind(&LuaServiceImpl::timer_func, service, std::placeholders::_1,
                std::placeholders::_2),
      (std::time_t)intval, 0);
  if (!timer_id) {
    return pusher.return_value();
  }
  if (!service->add_timer_lua_func(l, timer_id)) {
    service->timer_wheel_->cancel(timer_id);
    return pusher.return_value();
  }
  // 返回定时器ID
  return pusher.return_value((std::uint64_t)timer_id);
}

int LuaServiceImpl::lua_start_periodic_timer(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_value();
  }
  if (!lua_isfunction(l, -1) || !lua_isinteger(l, -2)) {
    service->write_fail_log("[lua]Invalid argument for ctx:expire_periodic");
    return pusher.return_value();
  }
  // 定时器间隔
  auto intval = (int)lua_tointeger(l, -2);
  if (intval <= 0) {
    service->write_fail_log("[lua]Invalid argument for ctx:expire_periodic");
    return pusher.return_value();
  }
  auto timer_id = service->timer_wheel_->schedule(
      std::bind(&LuaServiceImpl::periodic_timer_func, service,
                std::placeholders::_1, std::placeholders::_2),
      (std::time_t)intval, 0);
  if (!timer_id) {
    return pusher.return_value();
  }
  if (!service->add_timer_lua_func(l, timer_id)) {
    service->timer_wheel_->cancel(timer_id);
    return pusher.return_value();
  }
  // 返回定时器ID
  return pusher.return_value((std::uint64_t)timer_id);
}

int LuaServiceImpl::lua_sleep(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!lua_isinteger(l, -1)) {
    service->write_fail_log("[lua]Invalid argument for ctx:sleep");
    return 0;
  }
  // 定时器间隔
  auto intval = LuaUtil::pop_value<int>(l);
  if (intval <= 0) {
    service->write_fail_log("[lua]Invalid argument for ctx:sleep");
    // 定时器间隔必须大于零
    return 0;
  }
  auto timer_id = service->timer_wheel_->schedule_once(
      std::bind(&LuaServiceImpl::wakeup_func, service, std::placeholders::_1,
                std::placeholders::_2),
      (std::time_t)intval, service->thread_manager_->get_current_thread_id());
  if (!timer_id) {
    service->write_fail_log("Fork new timer failed");
  }
  return 0;
}

int LuaServiceImpl::lua_current_thread_id(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_value();
  }
  return pusher.return_value(service->thread_manager_->get_current_thread_id());
}

int LuaServiceImpl::lua_cancel_timer(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!lua_isinteger(l, -1)) {
    service->write_fail_log("[lua]Invalid argument for ctx:cancel");
    return 0;
  }
  auto timer_id = LuaUtil::pop_value<util::TimerID>(l);
  service->timer_wheel_->cancel(timer_id);
  return 0;
}

int LuaServiceImpl::lua_get_proxy_timeout(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_value();
  }
  if (!lua_isinteger(l, -1) || !lua_isinteger(l, -2) || !lua_isstring(l, -3)) {
    service->write_fail_log(
        "Invalid argument for method ctx:get_proxy_timeout");
    return pusher.return_value();
  }
  auto query_timeout = LuaUtil::pop_value<std::time_t>(l);
  auto service_uuid = LuaUtil::pop_value<rpc::ServiceUUID>(l);
  auto service_name = LuaUtil::pop_value<std::string>(l);
  auto &thread_manager = service->thread_manager_;
  auto current_thread_id = thread_manager->get_current_thread_id();
  auto &timer_wheel = service->timer_wheel_;
  auto proxy_id = thread_manager->try_get_proxy_id(service_uuid, service_name);
  if (proxy_id == rpc::INVALID_PROXY_ID) {
    service->write_fail_log("Try get proxy failed, UUID[" +
                            std::to_string(service_uuid) + "], service name[" +
                            service_name + "]");
    if (query_timeout) {
      //  启动定时器并记录正在等待的协程
      auto timer_id = timer_wheel->schedule_once(
          std::bind(&LuaServiceImpl::query_proxy_timer_func, service,
                    std::placeholders::_1, std::placeholders::_2),
          query_timeout, current_thread_id);
      if (!timer_id) {
        service->write_fail_log("Create timer failed, UUID[" +
                                std::to_string(service_uuid) +
                                "], service name[" + service_name + "]");
      }
    }
    return pusher.return_value();
  }
  service->proxy_uuid_map_[proxy_id] = service_uuid;
  return pusher.return_value(proxy_id);
}

int LuaServiceImpl::lua_get_proxy_from_peer(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_nil(3);
  }

  if (!lua_isinteger(l, -1)) {
    service->write_fail_log(
        "Invalid argument for method ctx:lua_get_proxy_from_peer");
    return pusher.return_value();
  }
  auto service_uuid = LuaUtil::pop_value<rpc::ServiceUUID>(l);

  auto thread_ptr = service->thread_manager_->get_current_thread();
  if (!thread_ptr) {
    return pusher.return_nil(3);
  }
  auto &stub_call_ptr = thread_ptr->get_stub_call();
  if (!stub_call_ptr) {
    service->write_fail_log(
        "Try get proxy from peer failed, cannot find current call");
    return pusher.return_nil(3);
  }
  auto proxy_id = service->thread_manager_->try_get_proxy_id(
      service_uuid, stub_call_ptr->getGlobalIndex(),
      stub_call_ptr->getTransport());
  if (proxy_id == rpc::INVALID_PROXY_ID) {
    return pusher.return_nil(3);
  }
  service->proxy_uuid_map_[proxy_id] = service_uuid;
  return pusher.return_value(stub_call_ptr->getGlobalIndex(), proxy_id,
                             service_uuid);
}

int LuaServiceImpl::lua_get_proxy_from_transport(lua_State *l) {
  LuaUtil::NilPusher pusher(l);
  auto *service = get_lua_service(l);
  if (!service) {
    return pusher.return_nil(2);
  }

  if (!lua_isinteger(l, -1)) {
    service->write_fail_log(
        "Invalid argument for method ctx:lua_get_proxy_from_transport");
    return pusher.return_value();
  }
  auto service_uuid = LuaUtil::pop_value<rpc::ServiceUUID>(l);

  auto thread_ptr = service->thread_manager_->get_current_thread();
  if (!thread_ptr) {
    return pusher.return_nil(2);
  }
  auto &stub_call_ptr = thread_ptr->get_stub_call();
  if (!stub_call_ptr) {
    service->write_fail_log(
        "Try get proxy from peer failed, cannot find current call");
    return pusher.return_nil(2);
  }
  auto proxy_id = service->thread_manager_->try_get_proxy_id(
      service_uuid, stub_call_ptr->getTransport());
  if (proxy_id == rpc::INVALID_PROXY_ID) {
    return pusher.return_nil(2);
  }
  service->proxy_uuid_map_[proxy_id] = service_uuid;
  return pusher.return_value(proxy_id, service_uuid);
}

int LuaServiceImpl::lua_log(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  auto argc = lua_gettop(l);
  if (argc != 2) {
    service->write_fail_log("Invalid argument for method ctx:log");
    return 0;
  }
  std::string line;
  if (!Variable::to_value_string(l, -1, line)) {
    service->write_fail_log("Invalid argument for method ctx:log");
    return 0;
  }
  if (!lua_isinteger(l, -2)) {
    service->write_fail_log("Invalid argument for method ctx:log");
    return 0;
  }
  auto level = (int)lua_tointeger(l, -2);
  std::string log_line("[lua][" + service->service_name_ + "]");
  log_line += std::move(line);
  if (service->log_history_) {
    service->log_history_->log(log_line.c_str());
  }
  service->write_log(level, log_line.c_str());
  return 0;
}

int LuaServiceImpl::lua_shutdown(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  service->box_->set_wait_stop_flag();
  return 0;
}

int LuaServiceImpl::lua_remove_proxy(lua_State *l) {
  auto *service = get_lua_service(l);
  if (!service) {
    return 0;
  }
  if (!lua_isinteger(l, -1)) {
    service->write_fail_log("Invalid argument for method ctx:remove_proxy");
    return 0;
  }
  auto proxy_id = (rpc::ProxyID)lua_tointeger(l, -1);
  service->get_thread_manager()->remove_proxy(proxy_id);
  return 0;
}

MsgFactory::MsgFactory(service::ServiceBox *box) {
  box_ = box;
  factory_ = make_unique_pool_ptr<ProtobufDynamicMessageFactory>();
}

MsgFactory::~MsgFactory() {
  msg_factory_map_.clear();
  factory_.reset();
  importer_.reset();
}

MethodType::MethodType(const ProtobufDescriptor *request_descriptor,
                       const ProtobufDescriptor *response_descriptor,
                       bool is_oneway, const std::string &service_name,
                       const std::string &service_method_name, int call_timeout,
                       ProtobufMessage *request, ProtobufMessage *response,
                       const std::string &uuid, bool has_ret_value) noexcept
    : request_message_descriptor(request_descriptor),
      response_message_descriptor(response_descriptor), oneway(is_oneway),
      service_name(service_name), method_name(service_method_name),
      timeout(call_timeout), request_message(request),
      response_message(response), uuid_string(uuid),
      has_ret_value(has_ret_value) {
  lua_real_method_name = "_" + uuid + "_" + method_name;
}

MethodType::MethodType(const MethodType &rht) noexcept
    : request_message_descriptor(rht.request_message_descriptor),
      response_message_descriptor(rht.response_message_descriptor),
      oneway(rht.oneway), service_name(rht.service_name),
      method_name(rht.method_name), timeout(rht.timeout),
      request_message(rht.request_message),
      response_message(rht.response_message), uuid_string(rht.uuid_string),
      lua_real_method_name(rht.lua_real_method_name),
      has_ret_value(rht.has_ret_value) {}

MethodType::MethodType(MethodType &&rht) noexcept
    : request_message_descriptor(rht.request_message_descriptor),
      response_message_descriptor(rht.response_message_descriptor),
      oneway(rht.oneway), service_name(std::move(rht.service_name)),
      method_name(std::move(rht.method_name)), timeout(rht.timeout),
      request_message(std::move(rht.request_message)),
      response_message(std::move(rht.response_message)),
      uuid_string(std::move(rht.uuid_string)),
      lua_real_method_name(std::move(rht.lua_real_method_name)),
      has_ret_value(rht.has_ret_value) {}

bool MethodType::has_retval() const {
  return (!oneway && (response_message != nullptr));
}

} // namespace lua
} // namespace kratos
